package com.yootk.rockemq;

import com.yootk.rockemq.util.MessageListSplitter;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.ArrayList;
import java.util.List;

public class MessageBatchSplitProducer { // 消息生产者
    // 如果此时你使用的是集群服务，则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码

    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制，所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        // 3、如果要想进行批处理的分配，一般都是通过List集合保存批量消息数据的
        List<Message> messages = new ArrayList<>(); // 保存批处理消息
        for (int x = 0; x < 1000; x++) { // 循环数据的配置
            Message msg = new Message(TOPIC, "沐言科技：www.yootk.com"
                    .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            messages.add(msg); // 将消息保存在List集合之中
        }
        MessageListSplitter messageListSplitter = new MessageListSplitter(messages);
        while(messageListSplitter.hasNext()) { // 每次只取出部分的集合
            SendResult result = producer.send(messageListSplitter.next()); // 消息批处理发送
            System.out.printf("【消息批量发送】发送状态：%s %n", result.getSendStatus()); // 获取发送的结果
        }
        producer.shutdown(); // 关闭生产者
    }
}